import holoviews as hv
hv.extension('bokeh')
hv.opts.defaults(hv.opts.Curve(width=500),
hv.opts.Image(width=500, colorbar=True, cmap='Viridis'))
import numpy as np
import scipy.signal
import scipy.fft
from IPython.display import Audio
7. Sistemas para el procesamiento de señales¶
7.1. Introducción¶
7.1.1. Definición de sistema¶
Hasta ahora hemos realizado análisis de señales, es decir el estudio de las señales y sus propiedades en el dominio del tiempo y frecuencia
En esta unidad nos enfocaremos en el procesamiento de señales, es decir el diseño de sistemas que procesan una señal de entrada y producen una señal de salida
Usaremos
\(x[n]\) para denotar la señal (discreta) de entrada y \(X[k]\) su espectro
\(y[n]\) para denotar la señal (discreta) de salida e \(Y[k]\) su espectro
7.1.2. Ejemplos de sistemas¶
Utilizando sistemas podemos modificar una señal para mejorar su calidad o remover efectos indeseados
Un sistema para reducir el ruido de una señal de electroencefalograma (EEG)
Un sistema para mejorar una imagen fuera de foco (sharpening)
Un sistema para eliminar el eco de un audio
7.2. Propiedades y ejemplos de sistemas¶
7.2.1. Sistemas sin memoria¶
Diremos que un sistema \(\Phi\) es un sistema sin memoria si
es decir que la salida del sistema en un instante \(n\) dado depende solo de la entrada en ese mismo instante
Veamos algunos ejemplos
Sistema atenuador/amplificador ideal
donde \(A>0\) se llama ganancia
Este sistema puede atenuar la entrada si \(0<A<1\) y amplificar si \(A>1\)
Sistema saturador (clamp)
Este sistema limita los valores de la señal de entrada en un rango fijo
Sistema rectificador
Este sistema eliminar la parte negativa de la señal de entrada
7.2.2. Sistema Lineal¶
Diremos que un sistema \(\Phi\) es lineal si cumple con las siguientes propiedades
Homogeneidad
Un cambio en la amplitud de la entrada produce un cambio equivalente en la salida
Aditividad
Señales que se suman en la entrada producen señales que se suman en la salida
Es decir que las señales pasan por el sistema sin interactuar entre ellas
Otras propiedades de los sistemas lineales
Producto de las propiedades anteriores se tiene que una cascada de sistemas lineales forma un sistema lineal equivalente y además la cascada de sistemas es conmutativa, es decir que el orden de los sistemas en la cascada no altera el resultado final
La siguiente figura ejemplifica esta propiedad
Otra propiedad interesante de los sistemas lineales es que cumplen el Principio de superposición:
Si descomponemos una señal en \(M\) componentes: \(x[n] = x_1[n] + x_2[n] + \ldots + x_M[n]\)
Y aplicamos un sistema lineal a cada componente: \(y_j[n] = \Phi(x_j[n])\)
Podemos recuperar la salida total usando aditividad: \(y_1[n] + y_2[n] + \ldots + y_M[n] = y[n]\)
La siguiente figura ejemplifica esta propiedad
7.2.3. Sistemas con memoria¶
Un sistema \(\Phi\) es un sistema con memoria si su salida actual depende sólo de la entrada actual, las entradas anteriores o las salidas anteriores
esto también se conoce como sistema causal
Un sistema con memoria no-causal usa entradas futuras (es decir \(x[n+1]\), \(x[n+2]\), …) y por ende solo se puede implementar de forma offline, es decir una vez que sea ha observado toda la señal
A continuación veremos algunos ejemplos de sistemas con memoria causales
Sistema con un retardo (delay)
Definido como
donde
la salida depende solo de “una” entrada anterior
el valor de m define que tan “antigua” es la entrada pasada
El delay o retarno no afecta la amplitud de los componentes frecuenciales de la señal pero si su fase, como muestra la siguiente figura
n = np.arange(0, 200, step=1)
x = lambda m: np.sin(2.0*np.pi*0.05*(n-m))
f = scipy.fft.rfftfreq(d=1, n=len(n))
p = []
for m in [0, 4, 8]:
x_delayed = x(m)
p.append(hv.Curve((n, x_delayed), 'Tiempo [s]', 'Señal'))
X = scipy.fft.rfft(x_delayed)
Xm = np.absolute(X)
p.append(hv.Curve((f, Xm), 'Frecuencia [Hz]', 'Espectro de amplitud'))
Xp = np.angle(X)
p.append(hv.Curve((f, Xm*Xp/np.amax(Xm)), 'Frecuencia [Hz]', 'Espectro de fase'))
hv.Layout(p).cols(3).opts(hv.opts.Curve(width=250, height=200))
Sistema reverberador o eco
Definido como
donde
la salida depende de una entrada “pasada” y la entrada actual
la ganancia controla si el “eco” se atenua o amplifica
Al contrario del sistema anterior, el eco si puede modificar el espectro de amplitud.
Notemos el efecto de interferencia constructiva y destructiva al modificar el retardo, como muestra la siguiente animación
n = np.arange(0, 200, step=1)
x = lambda m, A=1: A*np.sin(2.0*np.pi*0.05*(n-m))
f = scipy.fft.rfftfreq(d=1, n=len(n))
buffer = {}
for m in range(0, 40, 2):
xm = x(m)
buffer[m] = (xm, np.abs(scipy.fft.rfft(x(0) + xm)))
hMap1 = hv.HoloMap(kdims='m')
hMap2 = hv.HoloMap(kdims='m')
hMap3 = hv.HoloMap(kdims='m')
for m, (xm, X) in buffer.items():
hMap1[m] = hv.Curve((n, xm), 'Tiempo', 'x', label='Ax[n-m]')
hMap2[m] = hv.Curve((n, x(0) + xm), 'Tiempo', 'y')
hMap3[m] = hv.Curve((f, X), 'Frecuencia', 'Espectro')
p_clean = hv.Curve((n, x(0)), 'Tiempo', label='x[n]').opts(width=500, height=200)
plot = (p_clean * hMap1 + hMap2 + hMap3).cols(1).opts(hv.opts.Curve(height=200),
hv.opts.Overlay(legend_position='top_right'))
hv.output(plot, holomap='gif', fps=5)
El siguiente video muestra un experimento que ejemplifica la interferencia destructiva en una onda mecánica: https://www.youtube.com/watch?v=IU8xeJlJ0mk
Sistemas con múltiples ecos
Pueden combinarse más retardos para hacer un sistema reverberante más complejo
Por ejemplo el siguiente sistema
que lo implementamos como
Fs = 44100;
n = np.arange(0, 4, step=1.0/Fs)
x = lambda m: np.sin(2.0*np.pi*880*(n-m))*np.exp(-(n-m)**2/0.5**2)*np.heaviside(n-m, 0)
y = x(0) + 0.5*x(1.) + 0.25*x(2.) + 0.125*x(3.)
Da como resultado gráfico lo siguiente:
hv.Curve((n, y), 'Tiempo [s]', 'Señal con eco')
y el resultado sonoro es:
Audio(y, rate=Fs, normalize=False)
7.3. Sistema de respuesta finita al impulso (FIR)¶
Generalizando el ejemplo de sistema lineal reverberante a \(L\) retardos llegamos a
que se puede modelar como una convolución discreta entre \(h\) y \(x\)
Este sistema se conoce como
Sistema FIR (finite impulse response)
Sistema MA (moving average)
Sistema todo-zeros
y es de orden L (posee L+1 coeficientes)
Intepretación como media movil (MA)
El sistema FIR es equivalente a una media movil ponderada que se aplica sobre la entrada, donde los coeficientes del sistema son los ponderadores
Por ejemplo sea un sistema de 3 coeficientes \(h[0]=a\), \(h[1]=b\) y \(h[2]=c\)
donde cada salida se calcula a partir de
Un detalle es que para obtener el valor de \(y[0]\) e \(y[1]\) se deben establecer “condiciones de borde”, como por ejemplo \(x[-2] = x[-1]= 0\)
A continuación veremos algunos ejemplos de aplicaciones usando filtros FIR sencillos
7.3.1. Eliminando ruido blanco aditivo¶
Si tenemos una señal contaminada con ruido blanco aditivo como la que se muestra a continuación
np.random.seed(0)
n = np.arange(0, 100, step=1)
C = 5*np.exp(-0.5*(n[:, np.newaxis] - n[:, np.newaxis].T)**2/10**2)
# Señal limpia (lo que no conocemos)
x_clean = np.random.multivariate_normal(np.zeros_like(n), C)
# Datos: Señal + ruido (lo que medimos)
x = x_clean + 2*np.random.randn(len(n))
hv.Scatter((n, x), 'Tiempo', 'Datos (señal + ruido)').opts(width=500, height=200)
Podemos usar un sistema FIR promediador para “suavizar la contaminación”
Sea un sistema FIR con \(L\) coeficientes idénticos e iguales a \(1/L\)
L = 10
h = np.ones(shape=(L,))/L
y = scipy.signal.convolve(x, h, mode='same', method='auto')
La siguiente gráfica interactiva muestra el resultado de convolucionar este sistema con los datos contaminados
hMap1 = hv.HoloMap(kdims='Instante')
hMap2 = hv.HoloMap(kdims='Instante')
for m in range(0, 100-L, 5):
c = np.zeros_like(n, dtype=np.float64);
c[m:m+L] = h
hMap1[m] = hv.Curve((n, c), 'Tiempo', 'Filtro')
hMap2[m] = hv.Curve((n[:m], y[:m]), 'Tiempo', label='Señal filtrada')
p_clean = hv.Curve((n, x_clean), 'Tiempo', 'Señal', label='Señal limpia').opts(color='k', height=200)
(hMap1 + hMap2 * p_clean).cols(1).opts(hv.opts.Curve(height=200),
hv.opts.Overlay(legend_position='top_left'))
Nota
Este filtro promedia los datos vecinos resultando una versión suavizada de los mismos. Esta versión suavizada se aproxima a la “señal limpia” que está escondida en el ruido.
En general, mientras más “largo” sea el filtro mayor será el efecto de suavizado
7.3.2. Encontrando cambios en una señal¶
Sea la siguiente señal escalonada
n = np.arange(0, 100, step=1)
x = np.zeros_like(n, dtype=np.float64)
x[20:] += 1.
x[40:] += 1.
x[80:] -= 1.
hv.Curve((n, x), 'Tiempo', 'Datos').opts(width=500, height=200)
Si nos interesa encontrar cambios en la señal podemos usar un sistema diferenciador
h = np.array([0.5, -0.5])
y = scipy.signal.convolve(x, h, mode='same', method='auto')
La siguiente gráfica interactiva muestra el resultado de convolucionar este sistema con la señal escalonada
hMap1 = hv.HoloMap(kdims='Instante')
hMap2 = hv.HoloMap(kdims='Instante')
for m in range(0, 100-len(h), 5):
c = np.zeros_like(n, dtype=np.float64);
c[m:m+len(h)] = h
hMap1[m] = hv.Curve((n, c), 'Tiempo', 'Filtro')
hMap2[m] = hv.Curve((n[:m], y[:m]), 'Tiempo', 'Convolución')
(hMap1 + hMap2).cols(1).opts(hv.opts.Curve(height=200))
Nota
Los pulsos en la convolución están asociados a un cambio (ascenso o descenso) en la señal original
En un caso más general, este filtro nos da información de la velocidad con que cambia la señal
7.3.3. Remover tendencias¶
En el siguiente ejemplo tenemos una señal sinusoidal “montada” sobre otra señal que cambia más lentamente.
np.random.seed(0);
n = np.arange(0, 100, step=1)
C = np.exp(-0.5*(n[:, np.newaxis] - n[:, np.newaxis].T)**2/30**2)
x = np.sin(2.0*np.pi*0.1*n) + 5*np.random.multivariate_normal(np.zeros_like(n), C)
hv.Curve((n, x), 'Tiempo', 'Señal').opts(height=200)
En general nos referimos a la señal lenta como la “tendencia”
A continuación veremos un filtro para separar la parte rápida (sinusoide) de la parte lenta (tendencia) de este señal de ejemplo. En próximas lecciones veremos como diseñar este tipo de filtros
L = 5
h = -np.ones(shape=(L,))/L
h[L//2] += 1
y = scipy.signal.convolve(x, h, mode='same', method='auto')
hMap1 = hv.HoloMap(kdims='Instante')
hMap2 = hv.HoloMap(kdims='Instante')
for m in range(0, 100-len(h), 5):
c = np.zeros_like(n, dtype=np.float64);
c[m:m+len(h)] = h
hMap1[m] = hv.Curve((n, c), 'Tiempo', 'Filtro')
hMap2[m] = hv.Curve((n[0:m], y[0:m]), 'Tiempo', 'Convolución').opts(ylim=(-0.5, 0.5))
(hMap1 + hMap2).cols(1).opts(hv.opts.Curve(height=200))
7.4. Convolución con scipy¶
Podemos convolucionar una señal en Python usando
scipy.signal.convolve(in1, # Señal de entrada
in2, # Coeficientes del sistema
mode='full',
method='auto'
)
donde el argumento method puede ser
direct: Realiza la convolución en el dominio del tiempofft: Realiza la convolución multiplicando los espectrosauto: Se decide automaticamente en base al largo de las señales
y el argumento mode indica donde se hace la convolución
Para ejemplificar la influencia de este argumento consideremos una señal \(x=[a,b,c]\) y un sistema \(h=[d, e]\)
Si uso
mode=validel resultado será \(y=[ad+be, bd+ce]\)Si uso
mode=sameel resultado será \(y=[ae, ad+be, bd+ce]\), es decir se agregan ceros al principio de \(x\) tal que \(y\) sea del mismo largo que \(x\)Si uso
mode=fullel resultado será \(y=[ae, ad+be, bd+ce, cd]\), es decir se agregan ceros al principio y al final de \(x\)
7.4.1. Eliminando ruido versión 2.0¶
Considerando la misma señal contaminada con ruido blanco que vimos anteriormente utilizaremos scipy.signal.convolve para convolucionar con un filtro que suavice el ruido.
Probaremos dos sistemas FIR
coeficientes idénticos e iguales a \(1/L\), es decir una ventana rectangular
coeficientes que decaen suavemente a cero, como por ejemplo una ventana de Hamming
con distintos valores de \(L\) (largo del filtro)
En este caso los valores los obtenemos usando la función de scipy
scipy.signal.get_window(window, # String con el nombre de la ventana
Nx, # Entero con el largo de la ventana
...)
La ventana rectangular se llama boxcar mientras que la de Hamming se llama hamming. En la documentación de la función se puede revisar otras ventanas disponibles
# La señal de ejemplo de la sección 7.3.1
np.random.seed(0)
n = np.arange(0, 100, step=1)
C = 5*np.exp(-0.5*(n[:, np.newaxis] - n[:, np.newaxis].T)**2/10**2)
x_clean = np.random.multivariate_normal(np.zeros_like(n), C)
x = x_clean + 2*np.random.randn(len(n))
# La filtramos con distintas funciones y L para luego visualizar el resultado
filters = {}
results = {}
for L in [5, 10, 20, 30, 40]:
for filter_name in ['boxcar', 'hamming']:
h = scipy.signal.get_window(filter_name, L)
h = h/np.sum(h)
if not L in filters:
filters[L] = {}
results[L] = {}
filters[L][filter_name] = h
results[L][filter_name] = scipy.signal.convolve(x, h, mode='same', method='auto')
hMap1 = hv.HoloMap(kdims='L')
hMap2 = hv.HoloMap(kdims='L')
for L, curves in filters.items():
p = []
for filter_name, h in curves.items():
p.append(hv.Curve((range(L), h), 'Largo del filtro (L)', 'Filtro', label=filter_name))
hMap1[L] = hv.Overlay(p)
for L, curves in results.items():
p = []
for filter_name, y in curves.items():
p.append(hv.Curve((n, y), 'n', 'Señal', label=filter_name).opts(line_width=2, alpha=0.75))
hMap2[L] = hv.Overlay(p)
p_clean = hv.Curve((n, x_clean), 'Tiempo', 'Señal', label='Señal limpia').opts(color='k')
p_data = hv.Scatter((n, x), 'Tiempo', 'Datos (señal + ruido)').opts(width=500, height=200)
(p_data + hMap1 + hMap2 * p_clean).cols(1).opts(hv.opts.Curve(height=250),
hv.opts.Overlay(legend_position='top_right'))
Nota
Mientras más larga es la ventana (L) mayor es el suavizado. Adicionalmente la ventana de Hamming produce un filtrado más suave que la rectangular
En la lección siguiente veremos como diseñar un filtro, es decir como calcular los valores de h para resolver una tarea en particular